package drds.data_propagate.parse.table_meta_data;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastsql.sql.repository.Schema;
import drds.data_propagate.driver.packets.server.ResultSetPacket;
import drds.data_propagate.entry.position.EntryPosition;
import drds.data_propagate.filter.EventFilter;
import drds.data_propagate.parse.DumperImpl;
import drds.data_propagate.parse.ddl.DdlResult;
import drds.data_propagate.parse.ddl.DruidDdlParser;
import drds.data_propagate.parse.exception.ParseException;
import drds.data_propagate.parse.table_meta_data._do_.MetaDataHistoryDo;
import drds.data_propagate.parse.table_meta_data._do_.MetaDataSnapshotDo;
import drds.data_propagate.parse.table_meta_data.dao.MetaDataDataHistoryDao;
import drds.data_propagate.parse.table_meta_data.dao.MetaDataDataSnapshotDao;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang.ObjectUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;


public class DatabaseTableMetaDataStore implements TableMetaDataStore {

    public static final EntryPosition INIT_POSITION = new EntryPosition(-1L, "0", 0L, -2L);
    private static Logger log = LoggerFactory.getLogger(DatabaseTableMetaDataStore.class);
    private static Pattern pattern = Pattern.compile("Duplicate entry '.*' for key '*'");
    private static Pattern h2Pattern = Pattern.compile("Unique index or primary key violation");
    private static ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable, "[scheduledExecutorService-tableName-meta-snapshot]");
            thread.setDaemon(true);
            return thread;
        }
    });
    @Setter
    @Getter
    private ReadWriteLock lock = new ReentrantReadWriteLock();
    @Setter
    @Getter
    private AtomicBoolean initialized = new AtomicBoolean(false);
    @Setter
    @Getter
    private String taskId;
    @Setter
    @Getter
    private MemoryTableMetaDataStore memoryTableMetaDataStore;
    @Setter
    @Getter
    private volatile DumperImpl dumper; // 查询meta信息的链接
    @Setter
    @Getter
    private EventFilter filter;
    @Setter
    @Getter
    private EventFilter blackFilter;
    @Setter
    @Getter
    private EntryPosition lastEntryPosition;
    @Setter
    @Getter
    private boolean hasNewDdl;
    @Setter
    @Getter
    private MetaDataDataHistoryDao metaDataHistoryDao;
    @Setter
    @Getter
    private MetaDataDataSnapshotDao metaDataSnapshotDao;
    @Setter
    @Getter
    private int snapshotInterval = 24;
    @Setter
    @Getter
    private int snapshotExpire = 360;
    @Setter
    @Getter
    private ScheduledFuture<?> scheduleSnapshotFuture;

    public DatabaseTableMetaDataStore() {

    }

    @Override
    public boolean init(final String taskId) {
        if (initialized.compareAndSet(false, true)) {
            this.taskId = taskId;
            this.memoryTableMetaDataStore = new MemoryTableMetaDataStore();

            // 24小时生成一份snapshot
            if (snapshotInterval > 0) {
                scheduleSnapshotFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {

                    @Override
                    public void run() {
                        boolean applyResult = false;
                        try {
                            MDC.put("taskIdSequense", taskId);
                            applyResult = applySnapshotToDatabase(lastEntryPosition, false);
                        } catch (Throwable e) {
                            log.error("scheudle applySnapshotToDatabase faield", e);
                        }

                        try {
                            MDC.put("taskIdSequense", taskId);
                            if (applyResult) {
                                snapshotExpire((int) TimeUnit.HOURS.toSeconds(snapshotExpire));
                            }
                        } catch (Throwable e) {
                            log.error("scheudle snapshotExpire faield", e);
                        }
                    }
                }, snapshotInterval, snapshotInterval, TimeUnit.HOURS);
            }
        }
        return true;
    }

    @Override
    public void destory() {
        if (memoryTableMetaDataStore != null) {
            memoryTableMetaDataStore.destory();
        }

        if (dumper != null) {
            try {
                dumper.disconnect();
            } catch (IOException e) {
                log.error("ERROR # disconnect meta dumper for address:{}",
                        dumper.getConnector().getInetSocketAddress(), e);
            }
        }

        if (scheduleSnapshotFuture != null) {
            scheduleSnapshotFuture.cancel(false);
        }
    }
    //

    @Override
    public boolean apply(EntryPosition entryPosition, String schemaName, String ddl, String extra) {
        // 首先记录到内存结构
        lock.writeLock().lock();
        try {
            if (memoryTableMetaDataStore.apply(entryPosition, schemaName, ddl, extra)) {
                this.lastEntryPosition = entryPosition;
                this.hasNewDdl = true;
                // 同步每次变更给远程做历史记录
                return applyToDatabase(entryPosition, schemaName, ddl, extra);
            } else {
                throw new RuntimeException("apply decode memory is failed");
            }
        } finally {
            lock.writeLock().unlock();
        }
    }

    @Override
    public boolean rollback(EntryPosition entryPosition) {
        // 每次rollback需要重新构建一次memory data
        this.memoryTableMetaDataStore = new MemoryTableMetaDataStore();
        boolean flag = false;
        EntryPosition snapshotPosition = buildMemFromSnapshot(entryPosition);
        if (snapshotPosition != null) {
            applyHistoryOnMemory(snapshotPosition, entryPosition);
            flag = true;
        }

        if (!flag) {
            // 如果没有任何数据，则为初始化状态，全量dump一份关注的表
            if (dumpTableMeta(dumper, filter)) {
                // 记录一下snapshot结果,方便快速恢复
                flag = applySnapshotToDatabase(INIT_POSITION, true);
            }
        }

        return flag;
    }

    @Override
    public Map<String, String> snapshot() {
        return memoryTableMetaDataStore.snapshot();
    }

    @Override
    public TableMetaData find(String schemaName, String tableName) {
        lock.readLock().lock();
        try {
            return memoryTableMetaDataStore.find(schemaName, tableName);
        } finally {
            lock.readLock().unlock();
        }
    }
    //

    /**
     * 初始化的时候dump一下表结构
     */
    private boolean dumpTableMeta(DumperImpl dumper, final EventFilter filter) {
        try {
            ResultSetPacket resultSetPacket = dumper.query("show databases");
            List<String> schemaNameList = new ArrayList<String>();
            for (String schemaName : resultSetPacket.getValueList()) {
                schemaNameList.add(schemaName);
            }

            for (String schemaName : schemaNameList) {
                // eventFilter views
                resultSetPacket = dumper.query("show full tables from `" + schemaName + "` where Table_type = 'BASE TABLE'");
                List<String> tableNameList = new ArrayList<String>();
                for (String tableName : resultSetPacket.getValueList()) {
                    if ("BASE TABLE".equalsIgnoreCase(tableName)) {
                        continue;
                    }
                    String fullName = schemaName + "." + tableName;
                    if (blackFilter == null || !blackFilter.filter(fullName)) {
                        if (filter == null || filter.filter(fullName)) {
                            tableNameList.add(tableName);
                        }
                    }
                }

                if (tableNameList.isEmpty()) {
                    continue;
                }

                StringBuilder sql = new StringBuilder();
                for (String tableName : tableNameList) {
                    sql.append("show create tableName `" + schemaName + "`.`" + tableName + "`;");
                }

                List<ResultSetPacket> resultSetPacketList = dumper.querys(sql.toString());
                for (ResultSetPacket $resultSetPacket : resultSetPacketList) {
                    if ($resultSetPacket.getValueList().size() > 1) {
                        String createTableSql = $resultSetPacket.getValueList().get(1);
                        memoryTableMetaDataStore.apply(INIT_POSITION, schemaName, createTableSql, null);
                    }
                }
            }

            return true;
        } catch (IOException e) {
            throw new ParseException(e);
        }
    }

    private boolean applyToDatabase(EntryPosition entryPosition, String schemaName, String ddl, String extra) {
        Map<String, String> map = new HashMap<String, String>();
        map.put("taskIdSequense", taskId);
        map.put("binlogEventFileName", entryPosition.getBinlogFileName());
        map.put("binlogEventFileOffest", String.valueOf(entryPosition.getBinlogFileOffset()));
        map.put("masterId", String.valueOf(entryPosition.getServerId()));
        map.put("binlogEventExecuteTimestamp", String.valueOf(entryPosition.getExecuteTimestamp()));
        map.put("useSchema", schemaName);
        if (map.isEmpty()) {
            throw new RuntimeException("apply failed caused by content is empty in applyToDatabase");
        }
        // 待补充
        List<DdlResult> ddlResultList = DruidDdlParser.parse(ddl, schemaName);
        if (ddlResultList.size() > 0) {
            DdlResult ddlResult = ddlResultList.get(0);
            map.put("sqlSchema", ddlResult.getSchemaName());
            map.put("sqlTable", ddlResult.getTableName());
            map.put("sqlType", ddlResult.getEventType().name());
            map.put("sql", ddl);
            map.put("extra", extra);
        }

        MetaDataHistoryDo metaDataHistoryDo = new MetaDataHistoryDo();
        try {
            BeanUtils.populate(metaDataHistoryDo, map);
            // 会建立唯一约束,解决:
            // 1. 重复的binlog file+offest
            // 2. 重复的masterId+executeTimestamp
            metaDataHistoryDao.insert(metaDataHistoryDo);
        } catch (Throwable e) {
            if (isUkDuplicateException(e)) {
                // 忽略掉重复的位点
                log.warn("dup apply for sql : " + ddl);
            } else {
                throw new ParseException("apply history decode dataBaseName failed caused by : " + e.getMessage(), e);
            }

        }
        return true;
    }

    /**
     * 发布数据到console上
     */
    private boolean applySnapshotToDatabase(EntryPosition entryPosition, boolean init) {
        // 获取一份快照
        Map<String, String> schemaNameToDdlsMap = null;
        lock.readLock().lock();
        try {
            if (!init && !hasNewDdl) {
                // 如果是持续构建,则识别一下是否有DDL变更过,如果没有就忽略了
                return false;
            }
            this.hasNewDdl = false;
            schemaNameToDdlsMap = memoryTableMetaDataStore.snapshot();
        } finally {
            lock.readLock().unlock();
        }

        MemoryTableMetaDataStore memoryTableMetaDataStore = new MemoryTableMetaDataStore();
        for (Map.Entry<String, String> entry : schemaNameToDdlsMap.entrySet()) {
            memoryTableMetaDataStore.apply(entryPosition, entry.getKey(), entry.getValue(), null);
        }

        // 基于临时内存对象进行对比
        boolean compareAll = true;
        for (Schema schema : memoryTableMetaDataStore.getSchemaRepository().getSchemas()) {
            for (String tableName : schema.showTables()) {
                String fullName = schema + "." + tableName;
                if (blackFilter == null || !blackFilter.filter(fullName)) {
                    if (filter == null || filter.filter(fullName)) {
                        // issue : https://github.com/alibaba/canal/issues/1168
                        // 在生成snapshot时重新过滤一遍
                        if (!compareTableMetaDataDatabaseAndMemory(dumper, memoryTableMetaDataStore, schema.getName(), tableName)) {
                            compareAll = false;//存在其中的一个校验失败
                        }
                    }
                }
            }
        }

        if (compareAll) {
            Map<String, String> map = new HashMap<String, String>();
            map.put("taskIdSequense", taskId);
            map.put("binlogEventFileName", entryPosition.getBinlogFileName());
            map.put("binlogEventFileOffest", String.valueOf(entryPosition.getBinlogFileOffset()));
            map.put("masterId", String.valueOf(entryPosition.getServerId()));
            map.put("binlogEventExecuteTimestamp", String.valueOf(entryPosition.getExecuteTimestamp()));
            map.put("data", JSON.toJSONString(schemaNameToDdlsMap));
            if (map.isEmpty()) {
                throw new RuntimeException("apply failed caused by content is empty in applySnapshotToDatabase");
            }

            MetaDataSnapshotDo metaDataSnapshotDo = new MetaDataSnapshotDo();
            try {
                BeanUtils.populate(metaDataSnapshotDo, map);
                metaDataSnapshotDao.insert(metaDataSnapshotDo);
            } catch (Throwable e) {
                if (isUkDuplicateException(e)) {
                    // 忽略掉重复的位点
                    log.info("dup apply snapshot use readedIndex : " + entryPosition + " , just ignore");
                } else {
                    throw new ParseException("apply failed caused by : " + e.getMessage(), e);
                }
            }
            return true;
        } else {
            log.error("compare failed , check log");
        }
        return false;
    }

    private boolean compareTableMetaDataDatabaseAndMemory(DumperImpl dumper,
                                                          MemoryTableMetaDataStore memoryTableMetaDataStore, final String schemaName, final String tableName) {
        TableMetaData tableMetaDataInMemory = memoryTableMetaDataStore.find(schemaName, tableName);

        TableMetaData tableMetaDataInDatabase = new TableMetaData();
        tableMetaDataInDatabase.setSchemaName(schemaName);
        tableMetaDataInDatabase.setTableName(tableName);
        String createTableSql = null;
        try {
            ResultSetPacket resultSetPacket = dumper.query("show create tableName " + getFullName(schemaName, tableName));
            if (resultSetPacket.getValueList().size() > 1) {
                createTableSql = resultSetPacket.getValueList().get(1);
                tableMetaDataInDatabase.setColumnMetaDataList(TableMetaDataStoreManager.parseTableMetaData(schemaName, tableName, resultSetPacket));
            }
        } catch (Throwable e) {
            try {
                // retry for broke pipe, see:
                // https://github.com/alibaba/canal/issues/724
                dumper.reconnect();
                ResultSetPacket resultSetPacket = dumper.query("show create tableName " + getFullName(schemaName, tableName));
                if (resultSetPacket.getValueList().size() > 1) {
                    createTableSql = resultSetPacket.getValueList().get(1);
                    tableMetaDataInDatabase.setColumnMetaDataList(TableMetaDataStoreManager.parseTableMetaData(schemaName, tableName, resultSetPacket));
                }
            } catch (IOException e1) {
                if (e.getMessage().contains("errorNumber=1146")) {
                    log.error("tableName not exist in dataBaseName , pls check :" + getFullName(schemaName, tableName) + " , mem : "
                            + tableMetaDataInMemory);
                    return false;
                }
                throw new ParseException(e);
            }
        }

        boolean result = tableMetaDataCompare(tableMetaDataInMemory, tableMetaDataInDatabase);
        if (!result) {
            log.error("pls submit github issue, show create tableName ddl:" + createTableSql + " , compare failed . \n dataBaseName : "
                    + tableMetaDataInDatabase + " \n mem : " + tableMetaDataInMemory);
        }
        return result;
    }

    ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
    private EntryPosition buildMemFromSnapshot(EntryPosition entryPosition) {
        try {
            MetaDataSnapshotDo metaDataSnapshotDo = metaDataSnapshotDao.findByTimestamp(taskId, entryPosition.getExecuteTimestamp());
            if (metaDataSnapshotDo == null) {
                return null;
            }
            String masterId = metaDataSnapshotDo.getMasterId();
            String binlogFileName = metaDataSnapshotDo.getBinlogEventFileName();
            Long binlogEventFileOffest = metaDataSnapshotDo.getBinlogEventFileOffest();
            Long binlogEventExecuteTimestamp = metaDataSnapshotDo.getBinlogEventExecuteTimestamp();
            //
            EntryPosition snapshotEntryPosition = new EntryPosition(//
                    Long.valueOf(masterId == null ? "-2" : masterId), //
                    binlogFileName,//
                    binlogEventFileOffest == null ? 0l : binlogEventFileOffest,//
                    binlogEventExecuteTimestamp == null ? 0l : binlogEventExecuteTimestamp//
            );
            // data存储为Map<String,String>，每个分库一套建表
            String sqlData = metaDataSnapshotDo.getData();
            JSONObject jsonObject = JSON.parseObject(sqlData);
            for (Map.Entry entry : jsonObject.entrySet()) {
                // 记录到内存
                if (!memoryTableMetaDataStore.apply(snapshotEntryPosition, ObjectUtils.toString(entry.getKey()), ObjectUtils.toString(entry.getValue()), null)) {
                    return null;
                }
            }

            return snapshotEntryPosition;
        } catch (Throwable e) {
            throw new ParseException("apply failed caused by : " + e.getMessage(), e);
        }
    }

    private boolean applyHistoryOnMemory(EntryPosition entryPosition, EntryPosition rollbackEntryPosition) {
        try {
            List<MetaDataHistoryDo> metaDataHistoryDoList = metaDataHistoryDao.findByTimestamp(taskId, entryPosition.getExecuteTimestamp(), rollbackEntryPosition.getExecuteTimestamp());
            if (metaDataHistoryDoList == null) {
                return true;
            }

            for (MetaDataHistoryDo metaDataHistoryDo : metaDataHistoryDoList) {
                String masterId = metaDataHistoryDo.getMasterId();
                //
                String binlogFileName = metaDataHistoryDo.getBinlogFileName();
                Long binlogEventOffest = metaDataHistoryDo.getBinlogEventOffest();
                Long binlogEventExecuteTimestamp = metaDataHistoryDo.getBinlogEventExecuteTimestamp();
                //
                String useSchema = metaDataHistoryDo.getUseSchema();
                String sqlText = metaDataHistoryDo.getSql();
                EntryPosition snapshotEntryPosition = new EntryPosition(//
                        Long.valueOf(masterId == null ? "-2" : masterId),//
                        binlogFileName,//
                        binlogEventOffest == null ? 0L : binlogEventOffest,//
                        binlogEventExecuteTimestamp == null ? 0L : binlogEventExecuteTimestamp//
                );//

                // 如果是同一秒内,对比一下history的位点，如果比期望的位点要大，忽略之
                if (snapshotEntryPosition.getExecuteTimestamp() > rollbackEntryPosition.getExecuteTimestamp()) {
                    continue;
                } else if (rollbackEntryPosition.getServerId() == snapshotEntryPosition.getServerId()
                        && snapshotEntryPosition.compareTo(rollbackEntryPosition) > 0) {
                    continue;
                }

                // 记录到内存
                if (!memoryTableMetaDataStore.apply(snapshotEntryPosition, useSchema, sqlText, null)) {
                    return false;
                }

            }

            return metaDataHistoryDoList.size() > 0;
        } catch (Throwable e) {
            throw new ParseException("apply failed", e);
        }
    }

    private String getFullName(String schemaName, String tableName) {
        StringBuilder sb = new StringBuilder();
        return sb.append('`').append(schemaName).append('`').append('.').append('`').append(tableName).append('`').toString();
    }

    private boolean tableMetaDataCompare(TableMetaData sourceTableMetaData, TableMetaData targetTableMetaData) {
        if (!StringUtils.equalsIgnoreCase(sourceTableMetaData.getSchemaName(), targetTableMetaData.getSchemaName())) {
            return false;
        }

        if (!StringUtils.equalsIgnoreCase(sourceTableMetaData.getTableName(), targetTableMetaData.getTableName())) {
            return false;
        }
        //
        List<ColumnMetaData> sourceColumnMetaDataList = sourceTableMetaData.getColumnMetaDataList();
        List<ColumnMetaData> targetColumnMetaDataList = targetTableMetaData.getColumnMetaDataList();
        if (sourceColumnMetaDataList.size() != targetColumnMetaDataList.size()) {
            return false;
        }
        //
        for (int i = 0; i < sourceColumnMetaDataList.size(); i++) {
            ColumnMetaData sourceColumnMetaData = sourceColumnMetaDataList.get(i);
            ColumnMetaData targetColumnMetaData = targetColumnMetaDataList.get(i);
            //
            if (!StringUtils.equalsIgnoreCase(sourceColumnMetaData.getColumnName(), targetColumnMetaData.getColumnName())) {
                return false;
            }

            // if (!StringUtils.equalsIgnoreCase(sourceField.getColumnType(),
            // targetField.getColumnType())) {
            // return false;
            // }

            // https://github.com/alibaba/canal/issues/1100
            // 支持一下 int vs int(10)
            //该bug由DRDS严格检查
            //
            //if ((sourceColumnMetaData.isUnsigned() && !targetColumnMetaData.isUnsigned()) || (!sourceColumnMetaData.isUnsigned() && targetColumnMetaData.isUnsigned())) {
            //   return false;
            //}
            if (sourceColumnMetaData.isUnsigned() && targetColumnMetaData.isUnsigned() == false) {
                return false;//是否无符号不一致
            }
            String sign = sourceColumnMetaData.isUnsigned() ? "unsigned" : "signed";
            //columnType包含sign位
            String sourceColumnType = StringUtils.removeEndIgnoreCase(sourceColumnMetaData.getColumnType(), sign).trim();
            String targetColumnType = StringUtils.removeEndIgnoreCase(targetColumnMetaData.getColumnType(), sign).trim();

            boolean columnTypeCompare = false;
            columnTypeCompare |= StringUtils.containsIgnoreCase(sourceColumnType, targetColumnType);
            columnTypeCompare |= StringUtils.containsIgnoreCase(targetColumnType, sourceColumnType);
            if (!columnTypeCompare) {
                return false;
            }
            //不需要对默认值进行比较
            // if (!StringUtils.equalsIgnoreCase(sourceField.getDefaultValue(),targetField.getDefaultValue())) {return false;}
            if (sourceColumnMetaData.isNullable() != targetColumnMetaData.isNullable()) {
                return false;
            }

            // mysql会有一种处理,针对show create只有uk没有pk时，会在desc默认将uk当做pk
            boolean isSourcePrimaryKeyOrUniqueIndex = sourceColumnMetaData.isPrimaryKey() || sourceColumnMetaData.isUniqueIndex();
            boolean isTargetPrimaryKeyOrUniqueIndex = targetColumnMetaData.isPrimaryKey() || targetColumnMetaData.isUniqueIndex();
            if (isSourcePrimaryKeyOrUniqueIndex != isTargetPrimaryKeyOrUniqueIndex) {
                return false;
            }
        }

        return true;
    }

    private int snapshotExpire(int expireTimestamp) {
        return metaDataSnapshotDao.deleteByTimestamp(taskId, expireTimestamp);
    }


    public boolean isUkDuplicateException(Throwable t) {
        if (pattern.matcher(t.getMessage()).find() || h2Pattern.matcher(t.getMessage()).find()) {
            // 违反外键约束时也抛出这种异常，所以这里还要判断包含字符串Duplicate entry
            return true;
        }
        return false;
    }
}
